#include "config.h"

#include <sys/types.h>
#include <sys/wait.h>
#include <sys/un.h>
#include <errno.h>
#include <pthread.h>
#include <getopt.h>

#define DBG_SUBSYS S_LIBINTERFACE

#include <pthread.h>

#include "lichbd.h"
#include "vm.h"
#include "sock_unix.h"
#include "stor_root.h"
#include "../../storage/controller/volume_ctl.h"
#include "get_version.h"
#include "lich_md.h"
#include "rpc_proto.h"
#include "core.h"
#include "corenet.h"
#include "mem_cache.h"
#include "lichbd_rpc.h"
#include "job_dock.h"
#include "../../ynet/sock/sock_tcp.h"
#include "../../ynet/net/net_events.h"
#include "rpc_table.h"
#include "lichstor.h"
#include "analysis.h"
#include "dbg.h"

#define LICHBD_RPC_CMD_READ 1
#define LICHBD_RPC_CMD_WRITE 2
#define LICHBD_RPC_CMD_SNAP_READ 3
#define LICHBD_RPC_CMD_SNAP_DIFF 4

#if 1
#define USE_UNIX_DOMAIN
#endif

#if 0
#define LICHBD_CACHE_REUSE 1
#endif

#define CHECK_INTERVAL 30

typedef struct {
        uint32_t magic;
        int cmd;
        fileid_t snapid;
        fileid_t snapdst;
        msgid_t msgid;
        off_t offset;
        size_t size;
        int flag;
        int localize;
} lichbd_rpc_req_t;

typedef struct {
        uint32_t magic;
        int retval;
        msgid_t msgid;
} lichbd_rpc_reply_t;

typedef struct {
        task_t task;
        uint64_t latency;
} rpc_ctx_t;

typedef struct lichbd_srv_session {
        fileid_t id;
        sockid_t sockid;
        time_t last_check;
        mcache_entry_t *entry;
        char pool[MAX_NAME_LEN];
        char export_name[128];
        uint32_t ref;
        char erase;
        char localized;
        //char in_check;
} lichbd_srv_session_t;

typedef struct {
        char pool[MAX_NAME_LEN];
        char path[0];
} lichbd_path_t;

static int __tcp_sd__;
static int __unix_sd__;
static struct sockaddr_un __listen_addr__;
static struct sockaddr_un __conn_addr__;

static void __lichbd_srv_get(lichbd_srv_session_t *se);
static void __lichbd_srv_put(lichbd_srv_session_t *se);

#define __LISTEN_PATH__ "/tmp/lich_rpc.socket"

STATIC int __lichbd_get(const volid_t *volid, mcache_entry_t **_entry)
{
        int ret;
        mcache_entry_t *cent;

        ret = volume_ctl_get(volid, &cent);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        ret = mcache_ref(cent);
        if (unlikely(ret))
                GOTO(err_release, ret);

        volume_ctl_release(cent);
        YASSERT(cent->ref);

        *_entry = cent;
        
        DINFO("get "CHKID_FORMAT" %p\n", CHKID_ARG(volid), cent);
        
        return 0;
err_release:
        volume_ctl_release(cent);
err_ret:
        return ret;
}

static int lichbd_srv_session_alloc(lichbd_srv_session_t **_se, const fileid_t *id,
                                    const char *pool, const char *name, mcache_entry_t *entry)
{
        int ret;
        void *ptr;
        lichbd_srv_session_t *nse;

        ret = ymalloc((void **)&ptr, sizeof(lichbd_srv_session_t));
        if(ret)
                GOTO(err_ret, ret);

        nse = (lichbd_srv_session_t *)ptr;
        nse->id = *id;
        nse->entry = entry;
        strcpy(nse->export_name, name);
        strcpy(nse->pool, pool);
        nse->ref = 0;
        nse->erase = 0;
        nse->localized = 0;

        *_se = nse;

        return 0;
err_ret:
        return ret;
}

static int __lichbd_srv_send_reply(int fd, int retval)
{
        int ret;

        ret = write(fd, &retval, sizeof(retval));
        if (ret < 0) {
                ret = errno;
                GOTO(err_ret, ret);
        }

        return 0;
err_ret:
        return ret;
}

static int  __lichbd_srv_export_name(const char *pool, const char *name, lichbd_srv_session_t **_se)
{
        int ret;
        struct stat stbuf;
        fileid_t id;
        //mcache_entry_t *entry;
        lichbd_srv_session_t *se;
        char path[MAX_NAME_LEN];

        DINFO("connect to pool %s path %s\n", pool, name);

        snprintf(path, MAX_NAME_LEN, "%s", name);

        ret = stor_lookup1(pool, path, &id);
        if (unlikely(ret)) {
                DWARN("name %s\n", name);
                GOTO(err_ret, ret);
        }

        if (id.type != __VOLUME_CHUNK__) {
                DWARN("name %s is not volume.\n", name);
                ret = EPERM;
                GOTO(err_ret, ret);
        }

        ret = stor_getattr(pool, &id, &stbuf);
        if (unlikely(ret)) {
                if (ret == ENOENT) {
                        DWARN("%s deleted\n", name);
                        GOTO(err_ret, ret);
                } else
                        GOTO(err_ret, ret);
        }

        ret = lichbd_srv_session_alloc(&se, &id, pool, name, NULL);
        if (unlikely(ret)) {
                GOTO(err_ret, ret);
        }

        DINFO("connect to %s id:"CHKID_FORMAT"\n", name, CHKID_ARG(&id));

        *_se = se;

        return 0;
//err_release:
//        mcache_release(entry);
err_ret:
        return ret;
}

static int __lichbd_srv_reply_prep(buffer_t *buf, int retval, const lichbd_rpc_req_t *req)
{
        int ret;
        lichbd_rpc_reply_t *rep;

        ret = mbuffer_init(buf, sizeof(*rep));
        if (unlikely(ret))
                GOTO(err_ret, ret);

        rep = mbuffer_head(buf);
        rep->msgid = req->msgid;
        rep->retval = retval;

        return 0;
err_ret:
        return ret;
}

static int __lichbd_srv_reply(const sockid_t *sockid, int retval, const lichbd_rpc_req_t *req, buffer_t *_buf)
{
        int ret;
        buffer_t buf;

        ret = __lichbd_srv_reply_prep(&buf, retval, req);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        corenet_tcp_send(sockid, &buf, 0);
        if (_buf) {
                corenet_tcp_send(sockid, _buf, 0);
        }

        DBUG("__reply__\n");

        return 0;
err_ret:
        return ret;
}

#if LICHBD_CACHE_REUSE
static int __lichbd_srv_write__(lichbd_srv_session_t *ctx, const lichbd_rpc_req_t *req, buffer_t *buf)
{
        int ret;
        
retry:
        if (likely(ctx->entry)) {
                io_t io;
                io_init(&io, &ctx->id, NULL, req->offset, req->size, req->flag & O_DIRECT ? __FILE_ATTR_DIRECT__ : 0);
                ret = volume_ctl_write_direct(ctx->entry, &io, buf, 1);
                if (unlikely(ret)) {
                        if (ret == EREMCHG || ret == ESTALE || ret == ENOENT) {
                                if (ctx->entry) {
                                        mcache_release(ctx->entry);
                                        ctx->entry = NULL;
                                        DWARN(CHKID_FORMAT" moved\n", CHKID_ARG(&ctx->id));
                                }

                                goto retry;
                        } else
                                GOTO(err_ret, ret);
                }
        } else {
                ret = stor_write(ctx->pool, &ctx->id, buf, req->size, req->offset);
                if (unlikely(ret))
                        GOTO(err_ret, ret);
        }

        return 0;
err_ret:
        return ret;
}

#else  
static int __lichbd_srv_write__(lichbd_srv_session_t *ctx, const lichbd_rpc_req_t *req, buffer_t *buf)
{
        int ret;
        
        ret = stor_write(ctx->pool, &ctx->id, buf, req->size, req->offset);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        return 0;
err_ret:
        return ret;
}
#endif

static int __lichbd_srv_write(lichbd_srv_session_t *ctx, const lichbd_rpc_req_t *req, buffer_t *buf)
{
        int ret;
        const fileid_t *fileid;

        YASSERT(req->magic == YNET_PROTO_TCP_MAGIC);

        schedule_task_setname("lichbd_srv_write");

        fileid = &ctx->id;
        DBUG(CHKID_FORMAT" offset %llu size %llu\n", CHKID_ARG(fileid),
             (LLU)req->offset, (LLU)req->size);

        if (unlikely(ctx->localized == 0)) {
                DINFO("localize "CHKID_FORMAT" offset %llu size %llu\n", CHKID_ARG(fileid),
                      (LLU)req->offset, (LLU)req->size);

                ret = stor_localize(ctx->pool, fileid);
                if (ret) {
                        if (ret == ENOSPC) {
                                DWARN(CHKID_FORMAT" offset %llu size %llu\n", CHKID_ARG(fileid),
                                      (LLU)req->offset, (LLU)req->size);
                                //nothing todo;
                        } else {
                                GOTO(err_ret, ret);
                        }
                }

                ctx->localized = 1;
        }

        ret = __lichbd_srv_write__(ctx, req, buf);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        DBUG(CHKID_FORMAT" offset %llu size %llu finish\n", CHKID_ARG(fileid),
             (LLU)req->offset, (LLU)req->size);

        // TODO buf->len == 0
        // YASSERT(buf->len == req->size);
        __lichbd_srv_reply(&ctx->sockid, 0, req, NULL);

        return 0;
err_ret:
        __lichbd_srv_reply(&ctx->sockid, -ret, req, NULL);
        return ret;
}

static void __lichbd_rpc_request_post_task(void *arg1, void *arg2, void *arg3, void *arg4)
{
        rpc_ctx_t *ctx = arg1;
        int retval = *(int *)arg2;
        buffer_t *buf = arg3;
        uint64_t latency = *(uint64_t *)arg4;

        ctx->latency = latency;

        schedule_resume(&ctx->task, retval, buf);
}

static int __lichbd_rpc_setctx(msgid_t *msgid, rpc_ctx_t *ctx, const char *name)
{
        int ret;
        vm_t *vm = vm_self();
        rpc_vm_ctx_t *vm_ctx = vm->ctx;

        ret = rpc_table_getsolt(__rpc_table__, msgid, name);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        ctx->task = schedule_task_get();

        ret = rpc_table_setsolt(__rpc_table__, msgid, __lichbd_rpc_request_post_task, ctx,
                                &vm_ctx->sockid, NULL, _get_timeout() * 2);
        if (unlikely(ret))
                UNIMPLEMENTED(__DUMP__);

        return 0;
err_ret:
        return ret;
}

int lichbd_rpc_write(const buffer_t *_buf, size_t size, off_t offset, int flag)
{
        int ret;
        lichbd_rpc_req_t req;
        rpc_ctx_t ctx;
        msgid_t msgid;
        buffer_t buf;

        ret = __lichbd_rpc_setctx(&msgid, &ctx, "lichbd_rpc_write");
        if (unlikely(ret))
                GOTO(err_ret, ret);

        req.size = size;
        req.offset = offset;
        req.flag = flag;
        req.msgid = msgid;
        req.cmd = LICHBD_RPC_CMD_WRITE;
        req.magic = YNET_PROTO_TCP_MAGIC;

        mbuffer_init(&buf, 0);
        mbuffer_appendmem(&buf, &req, sizeof(req));
        vm_send(&buf, 0);
        vm_send((void *)_buf, BUFFER_KEEP);

        ret = schedule_yield("lichbd_rpc_write", NULL, &ctx);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        return 0;
err_ret:
        return ret;
}

#if LICHBD_CACHE_REUSE
static int __lichbd_srv_read__(lichbd_srv_session_t *ctx, const lichbd_rpc_req_t *req, buffer_t *buf)
{
        int ret;
        
retry:
        if (likely(ctx->entry)) {
                io_t io;
                io_init(&io, &ctx->id, NULL, req->offset, req->size, 0);
                ret = volume_ctl_read_direct(ctx->entry, &io, buf, 1);
                if (unlikely(ret)) {
                        if (ret == EREMCHG || ret == ESTALE || ret == ENOENT) {
                                if (ctx->entry) {
                                        mcache_release(ctx->entry);
                                        ctx->entry = NULL;
                                        DWARN(CHKID_FORMAT" moved\n", CHKID_ARG(&ctx->id));
                                }

                                goto retry;
                        } else
                                GOTO(err_ret, ret);
                }
        } else {
                ret = stor_read(ctx->pool, &ctx->id, buf, req->size, req->offset);
                if (unlikely(ret))
                        GOTO(err_ret, ret);
        }

        return 0;
err_ret:
        return ret;
}

#else  
static int __lichbd_srv_read__(lichbd_srv_session_t *ctx, const lichbd_rpc_req_t *req, buffer_t *buf)
{
        int ret;
        
        ret = stor_read(ctx->pool, &ctx->id, buf, req->size, req->offset);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        return 0;
err_ret:
        return ret;
}
#endif


static int __lichbd_srv_read(lichbd_srv_session_t *ctx, const lichbd_rpc_req_t *req)
{
        int ret;
        const fileid_t *fileid;
        buffer_t buf;

        ANALYSIS_BEGIN(0);

        YASSERT(req->magic == YNET_PROTO_TCP_MAGIC);

        schedule_task_setname("lichbd_srv_read");

        fileid = &ctx->id;

        DBUG(CHKID_FORMAT" offset %llu size %llu\n", CHKID_ARG(fileid),
             (LLU)req->offset, (LLU)req->size);

        if (unlikely(ctx->localized == 0 && req->offset > (1024 * 1024) && req->localize == 1)) {
                DINFO("localize "CHKID_FORMAT" offset %llu size %llu\n", CHKID_ARG(fileid),
                      (LLU)req->offset, (LLU)req->size);

                ret = stor_localize(ctx->pool, fileid);
                if (ret) {
                        if (ret == ENOSPC) {
                                DWARN(CHKID_FORMAT" offset %llu size %llu\n", CHKID_ARG(fileid),
                                      (LLU)req->offset, (LLU)req->size);
                                //nothing todo;
                        } else {
                                GOTO(err_ret, ret);
                        }
                }

                ctx->localized = 1;
        }

        mbuffer_init(&buf, 0);
        ret = __lichbd_srv_read__(ctx, req, &buf);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        DBUG(CHKID_FORMAT" offset %llu size %llu finish\n", CHKID_ARG(fileid),
             (LLU)req->offset, (LLU)req->size);

        __lichbd_srv_reply(&ctx->sockid, buf.len, req, &buf);

        ANALYSIS_QUEUE(0, IO_WARN, "lichbd_srv_read");

        return 0;
err_ret:
        __lichbd_srv_reply(&ctx->sockid, -ret, req, NULL);
        return ret;
}

static int __lichbd_srv_snap_read(lichbd_srv_session_t *ctx, const lichbd_rpc_req_t *req)
{
        int ret;
        const fileid_t *fileid;
        buffer_t buf;

        YASSERT(req->magic == YNET_PROTO_TCP_MAGIC);

        schedule_task_setname("lichbd_srv_snap_read");

        fileid = &ctx->id;

        DBUG(CHKID_FORMAT" offset %llu size %llu\n", CHKID_ARG(fileid),
             (LLU)req->offset, (LLU)req->size);

        mbuffer_init(&buf, 0);
        ret = stor_snapshot_read(ctx->pool, fileid, &req->snapid, &buf, req->size, req->offset);
        if (unlikely(ret)) {
                GOTO(err_ret, ret);
        }

        DBUG(CHKID_FORMAT" offset %llu size %llu finish\n", CHKID_ARG(fileid),
             (LLU)req->offset, (LLU)req->size);

        __lichbd_srv_reply(&ctx->sockid, buf.len, req, &buf);

        return 0;
err_ret:
        __lichbd_srv_reply(&ctx->sockid, -ret, req, NULL);
        return ret;
}

int lichbd_rpc_read(buffer_t *_buf, size_t size, off_t offset, int localize)
{
        int ret;
        lichbd_rpc_req_t req;
        rpc_ctx_t ctx;
        msgid_t msgid;
        buffer_t buf;

        DBUG("read size %u offset %llu\n", (int)size, (LLU)offset);

        ret = __lichbd_rpc_setctx(&msgid, &ctx, "lichbd_rpc_read");
        if (unlikely(ret))
                GOTO(err_ret, ret);

        req.size = size;
        req.offset = offset;
        req.msgid = msgid;
        req.cmd = LICHBD_RPC_CMD_READ;
        req.magic = YNET_PROTO_TCP_MAGIC;
        req.localize = localize;

        mbuffer_init(&buf, 0);
        mbuffer_appendmem(&buf, &req, sizeof(req));
        vm_send(&buf, 0);

        ret = schedule_yield("lichbd_rpc_read", _buf, &ctx);
        if (unlikely(ret)) {
                GOTO(err_ret, ret);
        }

        return 0;
err_ret:
        return ret;
}

int lichbd_rpc_snap_read(const fileid_t *snapid, buffer_t *_buf, size_t size, off_t offset)
{
        int ret;
        lichbd_rpc_req_t req;
        rpc_ctx_t ctx;
        msgid_t msgid;
        buffer_t buf;

        DBUG("read size %u offset %llu\n", (int)size, (LLU)offset);

        ret = __lichbd_rpc_setctx(&msgid, &ctx, "lichbd_rpc_snap_write");
        if (unlikely(ret))
                GOTO(err_ret, ret);

        req.size = size;
        req.offset = offset;
        req.msgid = msgid;
        req.snapid = *snapid;
        req.cmd = LICHBD_RPC_CMD_SNAP_READ;
        req.magic = YNET_PROTO_TCP_MAGIC;

        mbuffer_init(&buf, 0);
        mbuffer_appendmem(&buf, &req, sizeof(req));
        vm_send(&buf, 0);

        ret = schedule_yield("lichbd_rpc_snap_read", _buf, &ctx);
        if (unlikely(ret)) {
                GOTO(err_ret, ret);
        }

        return 0;
err_ret:
        return ret;
}

static int __lichbd_srv_snap_diff(lichbd_srv_session_t *ctx, const lichbd_rpc_req_t *req)
{
        int ret;
        const fileid_t *fileid;
        buffer_t buf;

        YASSERT(req->magic == YNET_PROTO_TCP_MAGIC);

        schedule_task_setname("lichbd_srv_snap_crement");

        fileid = &ctx->id;

        DBUG(CHKID_FORMAT" offset %llu size %llu\n", CHKID_ARG(fileid),
             (LLU)req->offset, (LLU)req->size);

        mbuffer_init(&buf, 0);
        ret = stor_snapshot_diff(ctx->pool, fileid, &req->snapid, &req->snapdst, &buf, req->size, req->offset);
        if (unlikely(ret)) {
                GOTO(err_ret, ret);
        }

        DBUG(CHKID_FORMAT" offset %llu size %llu finish\n", CHKID_ARG(fileid),
             (LLU)req->offset, (LLU)req->size);

        __lichbd_srv_reply(&ctx->sockid, buf.len, req, &buf);

        return 0;
err_ret:
        __lichbd_srv_reply(&ctx->sockid, -ret, req, NULL);
        return ret;
}

int lichbd_rpc_snap_diff(const fileid_t *snapsrc, const fileid_t *snapdst,
                buffer_t *_buf, size_t size, off_t offset)
{
        int ret;
        lichbd_rpc_req_t req;
        rpc_ctx_t ctx;
        msgid_t msgid;
        buffer_t buf;

        DBUG("read size %u offset %llu\n", (int)size, (LLU)offset);

        ret = __lichbd_rpc_setctx(&msgid, &ctx, "lichbd_rpc_snap_diff");
        if (unlikely(ret))
                GOTO(err_ret, ret);

        req.size = size;
        req.offset = offset;
        req.msgid = msgid;
        req.snapid = *snapsrc;
        req.snapdst = *snapdst;
        req.cmd = LICHBD_RPC_CMD_SNAP_DIFF;
        req.magic = YNET_PROTO_TCP_MAGIC;

        mbuffer_init(&buf, 0);
        mbuffer_appendmem(&buf, &req, sizeof(req));
        vm_send(&buf, 0);

        ret = schedule_yield("lichbd_rpc_snap_diff", _buf, &ctx);
        if (unlikely(ret)) {
                GOTO(err_ret, ret);
        }

        return 0;
err_ret:
        return ret;
}

static void __lichbd_srv_func(void *arg)
{
        int ret;
        rpc_request_t *rpc_request;
        lichbd_rpc_req_t req;
        lichbd_srv_session_t *se;

        rpc_request = arg;
        se = rpc_request->ctx;

        ret = mbuffer_popmsg(&rpc_request->buf, &req, sizeof(req));
        YASSERT(ret == 0);

        __lichbd_srv_get(se);

        if (req.cmd == LICHBD_RPC_CMD_WRITE) {
                __lichbd_srv_write(se, &req, &rpc_request->buf);
        } else if (req.cmd == LICHBD_RPC_CMD_READ) {
                __lichbd_srv_read(se, &req);
        } else if (req.cmd == LICHBD_RPC_CMD_SNAP_READ) {
                __lichbd_srv_snap_read(se, &req);
        } else if (req.cmd == LICHBD_RPC_CMD_SNAP_DIFF) {
                __lichbd_srv_snap_diff(se, &req);
        } else {
                UNIMPLEMENTED(__DUMP__);
        }

        __lichbd_srv_put(se);

        mbuffer_free(&rpc_request->buf);
        mem_cache_free(MEM_CACHE_128, rpc_request);
}

static int __lichbd_srv_newtask(void *ctx, void *buf, int *_count)
{
        int ret, count = 0;
        rpc_request_t *_req;
        lichbd_rpc_req_t req;
        buffer_t *_buf = buf;

        DBUG("__request__ %u\n", _buf->len);

        while (_buf->len >= sizeof(req)) {
                mbuffer_get(_buf, &req, sizeof(req));

#ifdef HAVE_STATIC_ASSERT
                static_assert(sizeof(*_req)  < sizeof(mem_cache128_t), "lichbd_srv_t");
#endif

                _req = mem_cache_calloc(MEM_CACHE_128, 0);
                YASSERT(_req);

                mbuffer_init(&_req->buf, 0);

                DBUG("cmd %u size %u buflen %u\n", req.cmd, (int)(req.size + sizeof(req)), (int)_buf->len);

                switch (req.cmd) {
                case LICHBD_RPC_CMD_WRITE:
                        if (req.size + sizeof(req) > _buf->len) {
                                mem_cache_free(MEM_CACHE_128, _req);
                                goto out;
                        }

                        mbuffer_pop(_buf, &_req->buf, sizeof (req) + req.size);

                        _req->ctx = ctx;
                        schedule_task_new("lichbd_rpc_write", __lichbd_srv_func, _req, -1);

                        break;
                case LICHBD_RPC_CMD_READ:

                        mbuffer_pop(_buf, &_req->buf, sizeof (req));

                        _req->ctx = ctx;
                        schedule_task_new("lichbd_rpc_read", __lichbd_srv_func, _req, -1);

                        break;
                case LICHBD_RPC_CMD_SNAP_READ:

                        mbuffer_pop(_buf, &_req->buf, sizeof (req));

                        _req->ctx = ctx;
                        schedule_task_new("lichbd_rpc_snap_read", __lichbd_srv_func, _req, -1);

                        break;
                case LICHBD_RPC_CMD_SNAP_DIFF:

                        mbuffer_pop(_buf, &_req->buf, sizeof (req));

                        _req->ctx = ctx;
                        schedule_task_new("lichbd_rpc_snap_diff", __lichbd_srv_func, _req, -1);

                        break;
                default:
                        DERROR("bad msgtype %u\n", req.cmd);
                        YASSERT(0);
                }

                count++;
        }

        if (count) {
                DBUG("new task %u\n", count);
        }

out:
        *_count = count;
        return 0;
//err_ret:
        return ret;
}

static void __lichbd_srv_free(lichbd_srv_session_t *se)
{
        if (se->entry) {
                mcache_release(se->entry);
                se->entry = NULL;
        }

        DINFO("session %s/"CHKID_FORMAT" free\n", se->export_name,
              CHKID_ARG(&se->id));

        yfree((void **)&se);
}

static void __lichbd_srv_get(lichbd_srv_session_t *se)
{
        se->ref++;
}

static void __lichbd_srv_put(lichbd_srv_session_t *se)
{
        if (--se->ref == 0 && se->erase) {
                __lichbd_srv_free(se);
        }
}

static void __lichbd_srv_close(void *arg)
{
        lichbd_srv_session_t *se = arg;

        DINFO("session %s "CHKID_FORMAT" closed\n", se->export_name,
              CHKID_ARG(&se->id));

        se->erase = 1;

        if (se->ref) {
                /*
                 * Must wait for all the jobs handled by thread finish, can't free the
                 * connection before this.
                 */
                DWARN("wanting thread's job done: nr(%u)\n", se->ref);
        } else {
                DINFO("there is no busy cmd to wait, free now\n");

                __lichbd_srv_free(se);
                return;
        }
}

#if LICHBD_CACHE_REUSE
static void  __lichbd_check__(void *arg)
{
        int ret;
        mcache_entry_t *tmp;
        lichbd_srv_session_t *ctx = arg;        

        DINFO("session check\n");

        if (ctx->entry == NULL) {
                DINFO(CHKID_FORMAT" not localized, need check\n", CHKID_ARG(&ctx->id));

                ret = __lichbd_get(&ctx->id, &tmp);
                if (unlikely(ret)) {
                        if (ret == EREMCHG) {
                                DWARN(""CHKID_FORMAT" not localized\n", CHKID_ARG(&ctx->id));
                        } else {
                                DWARN(""CHKID_FORMAT" check fail %u %s\n", CHKID_ARG(&ctx->id),
                                      ret, strerror(ret));
                        }
                } else {
                        ctx->entry = tmp;
                        DINFO(CHKID_FORMAT" localized\n", CHKID_ARG(&ctx->id));
                }
        } else {
                DINFO(CHKID_FORMAT" localize check\n", CHKID_ARG(&ctx->id));

                ret = __lichbd_get(&ctx->id, &tmp);
                if (unlikely(ret)) {
                        if (ret == EREMCHG) {
                                DWARN("release "CHKID_FORMAT" localize\n", CHKID_ARG(&ctx->id));
                                if (ctx->entry) {
                                        mcache_release(ctx->entry);//release previous reference
                                        ctx->entry = NULL;
                                }
                        } else {
                                DWARN(""CHKID_FORMAT" check fail %u %s\n", CHKID_ARG(&ctx->id),
                                      ret, strerror(ret));
                        }
                } else {
                        YASSERT(ctx->entry == tmp);
                        mcache_release(tmp);
                        DINFO(CHKID_FORMAT" localized\n", CHKID_ARG(&ctx->id));
                }
        }

        __lichbd_srv_put(ctx);
}

static void __lichbd_check(void *arg)
{
        time_t now = gettime();
        lichbd_srv_session_t *ctx = arg;

        if ((now - ctx->last_check) >  CHECK_INTERVAL) {
                ctx->last_check = now;
                __lichbd_srv_get(ctx);
                schedule_task_new("iscsi_check", __lichbd_check__, ctx, -1);
        }
}

STATIC int __lichbd_entry__(va_list ap)
{
        const volid_t *volid = va_arg(ap, volid_t *);
        mcache_entry_t **_cent = va_arg(ap, mcache_entry_t **);

        va_end(ap);

        return __lichbd_get(volid, _cent);
}

static void __lichbd_entry(const volid_t *volid, mcache_entry_t **_entry)
{
        int ret;
        mcache_entry_t *tmp;

        ret = core_request(core_hash(volid), -1, "lichbd_attach", __lichbd_entry__,
                           volid, &tmp);
        if (unlikely(ret)) {
                if (ret == EREMCHG) {
                        DWARN(""CHKID_FORMAT" not localized\n", CHKID_ARG(volid));
                } else {
                        DWARN(""CHKID_FORMAT" check fail %u %s\n", CHKID_ARG(volid),
                              ret, strerror(ret));
                }
        } else {
                *_entry = tmp;
                DINFO(CHKID_FORMAT" localized\n", CHKID_ARG(volid));
        }
}

#else

static void __lichbd_check(void *arg)
{
        (void) arg;
        return;
}

static void __lichbd_entry(const chkid_t *chkid, mcache_entry_t **_entry)
{
        (void) chkid;
        *_entry = NULL;
}

#endif

static int  __lichbd_srv_attach(lichbd_srv_session_t *ctx, int sd, uint32_t addr)
{
        int ret, hash;
        fileid_t fileid;

        ret = stor_lookup1(ctx->pool, ctx->export_name, &fileid);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        hash = core_hash(&fileid);

        ctx->sockid.sd = sd;
        ctx->sockid.seq = _random();
        ctx->sockid.type = SOCKID_CORENET;
        ctx->sockid.addr = addr;
        __lichbd_entry(&ctx->id, &ctx->entry);
        
        ret = core_attach(hash, &ctx->sockid, "lichbd_srv", ctx,
                          __lichbd_srv_newtask, __lichbd_srv_close, __lichbd_check);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        return 0;
err_ret:
        return ret;
}        

static int __lichbd_srv_accept__(int in_sd)
{
        int ret, sd, retry = 0;
        lichbd_srv_session_t *se;
        socklen_t alen;
        lichbd_path_t *_path;
        char buf[MAX_BUF_LEN];
        uint32_t addr;

        if (in_sd == __unix_sd__) {
                struct sockaddr_un sin;
                _memset(&sin, 0, sizeof(sin));
                alen = sizeof(struct sockaddr_un);

                sd = accept(in_sd, &sin, &alen);
                if (sd < 0 ) {
                        ret = errno;
                        GOTO(err_ret, ret);
                }

                addr = 123;
                DBUG("accept unix connection\n");
        } else {
                YASSERT(in_sd == __tcp_sd__);
                struct sockaddr_in sin;
                _memset(&sin, 0, sizeof(sin));
                alen = sizeof(struct sockaddr_in);

                sd = accept(in_sd, &sin, &alen);
                if (sd < 0 ) {
                        ret = errno;
                        GOTO(err_ret, ret);
                }

                addr = sin.sin_addr.s_addr;

                DBUG("accept tcp connection\n");
        }

        ret = sock_poll_sd(sd, 1000 * 1000, POLLIN);
        if (unlikely(ret))
                GOTO(err_fd, ret);

        ret = recv(sd, buf, MAX_BUF_LEN, 0);
        if (ret < 0) {
                ret = errno;
                GOTO(err_fd, ret);
        }

        _path = (lichbd_path_t *)buf;

retry:
        ret = __lichbd_srv_export_name(_path->pool, _path->path, &se);
        if (unlikely(ret)) {
                ret = _errno(ret);
                if (ret == EAGAIN) {
                        USLEEP_RETRY(err_rep, ret, retry, retry, 50, (100 * 1000));
                } else
                        GOTO(err_rep, ret);
        }

        DINFO("negotiation with %d\n", sd);

        ret = tcp_sock_tuning(sd, 1, 1);
        if (unlikely(ret))
                GOTO(err_free, ret);

        ret = __lichbd_srv_attach(se, sd, addr);
        if (unlikely(ret))
                GOTO(err_free, ret);

        __lichbd_srv_send_reply(sd, 0);

        return 0;
err_free:
        if (se->entry) {
                mcache_release(se->entry);
                se->entry = NULL;
        }
        yfree((void **)&se);
err_rep:
        __lichbd_srv_send_reply(sd, ret);
err_fd:
        close(sd);
err_ret:
        return ret;
}

static void *__lichbd_srv_accept(void *_arg)
{
        int ret, count, fds[2], i;
        struct pollfd result[2];

        (void) _arg;
        DINFO("start...\n");

        while (1) {
                fds[0] = __unix_sd__;
                fds[1] = __tcp_sd__;

                count = 2;
                ret = sock_poll_sd1(fds, 2, POLLIN, 1000 * 1000, result, &count);
                if (unlikely(ret)) {
                        if (ret == ETIMEDOUT || ret == ETIME)
                                continue;
                        else
                                GOTO(err_ret, ret);
                 }

                DINFO("got new event\n");

                for (i = 0; i < 2; i++) {
                        if (result[i].revents == 0)
                                continue;

                        YASSERT(result[i].revents == POLLIN);
                        __lichbd_srv_accept__(result[i].fd);
                }
        }

        return NULL;
err_ret:
        UNIMPLEMENTED(__DUMP__);
        return NULL;
}

static int __lichbd_srv_passive()
{
        int ret;
        pthread_t th;
        pthread_attr_t ta;

        ret = sock_unix_listen(__LISTEN_PATH__, &__unix_sd__, &__listen_addr__);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        DINFO("listen @ %s, sd %u\n", __LISTEN_PATH__, __unix_sd__);

        char port[MAX_BUF_LEN];
        snprintf(port, MAX_BUF_LEN, "%u", gloconf.lichbd_port);
        ret = tcp_sock_hostlisten(&__tcp_sd__, NULL, port,
                                  YNET_QLEN, YNET_RPC_BLOCK, 1);
        if (unlikely(ret)) {
                GOTO(err_ret, ret);
        }

        (void) pthread_attr_init(&ta);
        (void) pthread_attr_setdetachstate(&ta, PTHREAD_CREATE_DETACHED);
 
        ret = pthread_create(&th, &ta, __lichbd_srv_accept, NULL);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        return 0;
err_ret:
        return ret;
}

static void *__lichbd_srv_service(void *_arg)
{
        int ret;

        (void) _arg;
        DINFO("start...\n");

        while (1) {
                ret = __lichbd_srv_passive(0);
                if (unlikely(ret)) {
                        if (ret == EADDRINUSE) {
                                DWARN("addr in used\n");
                                sleep(2);
                                continue;
                        } else
                                GOTO(err_ret, ret);
                }

                DINFO("lichbd_srv service inited\n");

                break;
        }

        return NULL;
err_ret:
        return NULL;
}

int lichbd_srv_init()
{
        int ret;
        pthread_t th;
        pthread_attr_t ta;

        (void) pthread_attr_init(&ta);
        (void) pthread_attr_setdetachstate(&ta, PTHREAD_CREATE_DETACHED);
 
        ret = pthread_create(&th, &ta, __lichbd_srv_service, NULL);
        if (unlikely(ret))
                GOTO(err_ret, ret);
       
        return 0;
err_ret:
        return ret;
}

int lichbd_rpc_reply(int *_count)
{
        int ret, count = 0;
        lichbd_rpc_reply_t rep;
        buffer_t *buf, tmp;
        vm_t *vm = vm_self();

        DBUG("__request__\n");

        buf = &vm->recv_buf;
        while (buf->len >= sizeof(rep)) {
                mbuffer_get(buf, &rep, sizeof(rep));

                if (rep.retval < 0) {
                        mbuffer_popmsg(buf, &rep, sizeof(rep));
                        rpc_table_post(__rpc_table__, &rep.msgid, -rep.retval, NULL, 0);
                } else {
                        if (rep.retval) {//read
                                if (rep.retval + sizeof(rep) > buf->len)
                                        goto out;

                                mbuffer_popmsg(buf, &rep, sizeof(rep));

                                mbuffer_init(&tmp, 0);
                                mbuffer_pop(buf, &tmp, rep.retval);
                                rpc_table_post(__rpc_table__, &rep.msgid, 0, &tmp, 0);
                        } else {
                                mbuffer_popmsg(buf, &rep, sizeof(rep));
                                rpc_table_post(__rpc_table__, &rep.msgid, 0, NULL, 0);
                        }
                }

                count++;
        }

        if (count) {
                DBUG("new task %u\n", count);
        }

out:
        *_count = 0;

        return 0;
//err_ret:
        return ret;
}

static int __lichbd_rpc_connect(sockid_t *sockid, const char *pool, const char *path)
{
        int ret;
        char port[MAX_BUF_LEN], host[MAX_BUF_LEN];
        net_handle_t nh;
        chkinfo_t *chkinfo;
        char _chkinfo[CHKINFO_MAX];
        fileid_t fileid;

        ret = stor_lookup1(pool, path, &fileid);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        chkinfo = (void *)_chkinfo;
        ret = md_chunk_getinfo(pool, NULL, &fileid, chkinfo, NULL);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        ret = network_connect(&chkinfo->diskid[0].id, NULL, 1, 0);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        strcpy(host, network_rname(&chkinfo->diskid[0].id));

        snprintf(port, MAX_BUF_LEN, "%u", gloconf.lichbd_port);
        ret = tcp_sock_hostconnect(&nh, host, port, 0, 10, 0);
        if (unlikely(ret)) {
                GOTO(err_ret, ret);
        }

        *sockid = nh.u.sd;
        //sockid->type = SOCKID_CORENET;
        sockid->type = SOCKID_NORMAL;

        return 0;
err_ret:
        return ret;
}


int lichbd_rpc_connect(sockid_t *sockid, const char *pool, const char *path)
{
        int ret, retval;
        uint32_t rand;
        lichbd_path_t *_path;

        //YASSERT(!ng.daemon);

        DINFO("connect to %s\n", path);

        ret =  sock_unix_connect(__LISTEN_PATH__, &sockid->sd, &__conn_addr__);
        if (unlikely(ret)) {
                if (ret == ECONNREFUSED) {
                        ret = __lichbd_rpc_connect(sockid, pool, path);
                        if (unlikely(ret))
                                GOTO(err_ret, ret);
                } else
                        GOTO(err_ret, ret);
        } else {
                sockid->addr = 124;
                //sockid->type = SOCKID_CORENET;
                sockid->type = SOCKID_NORMAL;
        }

        while (1) {
                rand = _random();
                if (rand != sockid->seq) {
                        sockid->seq = rand;
                        break;
                }
        }

        ret = ymalloc((void **)&_path, sizeof(lichbd_path_t) + strlen(path) + 1);
        if (unlikely(ret))
                GOTO(err_sd, ret);

        strcpy(_path->pool, pool);
        strcpy(_path->path, path);

        ret = send(sockid->sd, _path, sizeof(lichbd_path_t) + strlen(path) + 1, 0);
        if (ret < 0) {
                ret = errno;
                GOTO(err_free, ret);
        }

        ret = recv(sockid->sd, &retval, sizeof(retval), 0);
        if (ret < 0) {
                ret = errno;
                GOTO(err_free, ret);
        }

        if (ret != sizeof(retval)) {
                ret = ECONNRESET;
                GOTO(err_free, ret);
        }

        ret = retval;
        if (unlikely(ret)) {
                YASSERT(ret != EINVAL);
                GOTO(err_free, ret);
        }

        ret = tcp_sock_tuning(sockid->sd, 1, 1);
        if (unlikely(ret))
                GOTO(err_sd, ret);

        DINFO("connect to %u ok\n", sockid->sd);

        yfree((void **)&_path);
        return 0;

err_free:
        yfree((void **)&_path);
err_sd:
        close(sockid->sd);
err_ret:
        return ret;
}

int lichbd_rpc_init()
{
        return 0;
#if 0
        int ret;

        DINFO("init rpc private table\n");

        ret = rpc_table_private_init();
        if (unlikely(ret))
                GOTO(err_ret, ret);

        ret = rpc_table_async_init();
        if (unlikely(ret))
                GOTO(err_ret, ret);

        return 0;
err_ret:
        return ret;
#endif
}

int lichbd_rpc_check()
{
        return 0;
}
